{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<h1> Exploring tf.transform </h1>\n",
    "\n",
    "While Pandas is fine for experimenting, for operationalization of your workflow, it is better to do preprocessing in Apache Beam. This will also help if you need to preprocess data in flight, since Apache Beam also allows for streaming.\n",
    "\n",
    "Only specific combinations of TensorFlow/Beam are supported by tf.transform. So make sure to get a combo that is.\n",
    "\n",
    "* TFT 0.8.0\n",
    "* TF 1.8 or higher\n",
    "* Apache Beam [GCP] 2.9.0 or higher"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bash\n",
    "pip install apache-beam[gcp]==2.16.0 tensorflow_transform==0.15.0"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<b>Restart the kernel</b> after you do a pip install (click on the reload button above)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bash\n",
    "pip freeze | grep -e 'flow\\|beam'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import tensorflow as tf\n",
    "import tensorflow_transform as tft\n",
    "import shutil\n",
    "print(tf.__version__)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# change these to try this notebook out\n",
    "BUCKET = 'cloud-training-demos-ml'\n",
    "PROJECT = 'cloud-training-demos'\n",
    "REGION = 'us-central1'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "os.environ['BUCKET'] = BUCKET\n",
    "os.environ['PROJECT'] = PROJECT\n",
    "os.environ['REGION'] = REGION"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bash\n",
    "gcloud config set project $PROJECT\n",
    "gcloud config set compute/region $REGION"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bash\n",
    "if ! gcloud storage ls | grep -q gs://${BUCKET}/; then\n",
    "  gcloud storage buckets create --location=${REGION} gs://${BUCKET}\n",
    "fi"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Input source: BigQuery\n",
    "\n",
    "Get data from BigQuery but defer filtering etc. to Beam.\n",
    "Note that the dayofweek column is now strings."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from google.cloud import bigquery\n",
    "def create_query(phase, EVERY_N):\n",
    "  \"\"\"\n",
    "  phase: 1=train 2=valid\n",
    "  \"\"\"\n",
    "  base_query = \"\"\"\n",
    "WITH daynames AS\n",
    "  (SELECT ['Sun', 'Mon', 'Tues', 'Wed', 'Thurs', 'Fri', 'Sat'] AS daysofweek)\n",
    "SELECT\n",
    "  (tolls_amount + fare_amount) AS fare_amount,\n",
    "  daysofweek[ORDINAL(EXTRACT(DAYOFWEEK FROM pickup_datetime))] AS dayofweek,\n",
    "  EXTRACT(HOUR FROM pickup_datetime) AS hourofday,\n",
    "  pickup_longitude AS pickuplon,\n",
    "  pickup_latitude AS pickuplat,\n",
    "  dropoff_longitude AS dropofflon,\n",
    "  dropoff_latitude AS dropofflat,\n",
    "  passenger_count AS passengers,\n",
    "  'notneeded' AS key\n",
    "FROM\n",
    "  `nyc-tlc.yellow.trips`, daynames\n",
    "WHERE\n",
    "  trip_distance > 0 AND fare_amount > 0\n",
    "  \"\"\"\n",
    "\n",
    "  if EVERY_N == None:\n",
    "    if phase < 2:\n",
    "      # training\n",
    "      query = \"{0} AND ABS(MOD(FARM_FINGERPRINT(CAST(pickup_datetime AS STRING), 4)) < 2\".format(base_query)\n",
    "    else:\n",
    "      query = \"{0} AND ABS(MOD(FARM_FINGERPRINT(CAST(pickup_datetime AS STRING), 4)) = {1}\".format(base_query, phase)\n",
    "  else:\n",
    "      query = \"{0} AND ABS(MOD(FARM_FINGERPRINT(CAST(pickup_datetime AS STRING)), {1})) = {2}\".format(base_query, EVERY_N, phase)\n",
    "    \n",
    "  return query\n",
    "\n",
    "query = create_query(2, 100000)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df_valid = bigquery.Client().query(query).to_dataframe()\n",
    "display(df_valid.head())\n",
    "df_valid.describe()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create ML dataset using tf.transform and Dataflow\n",
    "\n",
    "Let's use Cloud Dataflow to read in the BigQuery data and write it out as CSV files. Along the way, let's use tf.transform to do scaling and transforming. Using tf.transform allows us to save the metadata to ensure that the appropriate transformations get carried out during prediction as well."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%writefile requirements.txt\n",
    "tensorflow-transform==0.8.0"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Test transform_data is type pcollection. test if _ = is neccesary"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import datetime\n",
    "import tensorflow as tf\n",
    "import apache_beam as beam\n",
    "import tensorflow_transform as tft\n",
    "from tensorflow_transform.beam import impl as beam_impl\n",
    "\n",
    "def is_valid(inputs):\n",
    "  try:\n",
    "    pickup_longitude = inputs['pickuplon']\n",
    "    dropoff_longitude = inputs['dropofflon']\n",
    "    pickup_latitude = inputs['pickuplat']\n",
    "    dropoff_latitude = inputs['dropofflat']\n",
    "    hourofday = inputs['hourofday']\n",
    "    dayofweek = inputs['dayofweek']\n",
    "    passenger_count = inputs['passengers']\n",
    "    fare_amount = inputs['fare_amount']\n",
    "    return (fare_amount >= 2.5 and pickup_longitude > -78 and pickup_longitude < -70 \\\n",
    "      and dropoff_longitude > -78 and dropoff_longitude < -70 and pickup_latitude > 37 \\\n",
    "      and pickup_latitude < 45 and dropoff_latitude > 37 and dropoff_latitude < 45 \\\n",
    "      and passenger_count > 0)\n",
    "  except:\n",
    "    return False\n",
    "  \n",
    "def preprocess_tft(inputs):\n",
    "      import datetime   \n",
    "      print inputs\n",
    "      result = {}\n",
    "      result['fare_amount'] = tf.identity(inputs['fare_amount'])     \n",
    "      result['dayofweek'] = tft.string_to_int(inputs['dayofweek']) # builds a vocabulary\n",
    "      result['hourofday'] = tf.identity(inputs['hourofday']) # pass through\n",
    "      result['pickuplon'] = (tft.scale_to_0_1(inputs['pickuplon'])) # scaling numeric values\n",
    "      result['pickuplat'] = (tft.scale_to_0_1(inputs['pickuplat']))\n",
    "      result['dropofflon'] = (tft.scale_to_0_1(inputs['dropofflon']))\n",
    "      result['dropofflat'] = (tft.scale_to_0_1(inputs['dropofflat']))\n",
    "      result['passengers'] = tf.cast(inputs['passengers'], tf.float32) # a cast\n",
    "      result['key'] = tf.as_string(tf.ones_like(inputs['passengers'])) # arbitrary TF func\n",
    "      # engineered features\n",
    "      latdiff = inputs['pickuplat'] - inputs['dropofflat']\n",
    "      londiff = inputs['pickuplon'] - inputs['dropofflon']\n",
    "      result['latdiff'] = tft.scale_to_0_1(latdiff)\n",
    "      result['londiff'] = tft.scale_to_0_1(londiff)\n",
    "      dist = tf.sqrt(latdiff * latdiff + londiff * londiff)\n",
    "      result['euclidean'] = tft.scale_to_0_1(dist)\n",
    "      return result\n",
    "\n",
    "def preprocess(in_test_mode):\n",
    "  import os\n",
    "  import os.path\n",
    "  import tempfile\n",
    "  from apache_beam.io import tfrecordio\n",
    "  from tensorflow_transform.coders import example_proto_coder\n",
    "  from tensorflow_transform.tf_metadata import dataset_metadata\n",
    "  from tensorflow_transform.tf_metadata import dataset_schema\n",
    "  from tensorflow_transform.beam import tft_beam_io\n",
    "  from tensorflow_transform.beam.tft_beam_io import transform_fn_io\n",
    "\n",
    "  job_name = 'preprocess-taxi-features' + '-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S')    \n",
    "  if in_test_mode:\n",
    "    import shutil\n",
    "    print 'Launching local job ... hang on'\n",
    "    OUTPUT_DIR = './preproc_tft'\n",
    "    shutil.rmtree(OUTPUT_DIR, ignore_errors=True)\n",
    "    EVERY_N = 100000\n",
    "  else:\n",
    "    print 'Launching Dataflow job {} ... hang on'.format(job_name)\n",
    "    OUTPUT_DIR = 'gs://{0}/taxifare/preproc_tft/'.format(BUCKET)\n",
    "    import subprocess\n",
    "    subprocess.call('gcloud storage rm --recursive {}'.format(OUTPUT_DIR).split())\n",
    "    EVERY_N = 10000\n",
    "    \n",
    "  options = {\n",
    "    'staging_location': os.path.join(OUTPUT_DIR, 'tmp', 'staging'),\n",
    "    'temp_location': os.path.join(OUTPUT_DIR, 'tmp'),\n",
    "    'job_name': job_name,\n",
    "    'project': PROJECT,\n",
    "    'max_num_workers': 6,\n",
    "    'teardown_policy': 'TEARDOWN_ALWAYS',\n",
    "    'no_save_main_session': True,\n",
    "    'requirements_file': 'requirements.txt'\n",
    "  }\n",
    "  opts = beam.pipeline.PipelineOptions(flags=[], **options)\n",
    "  if in_test_mode:\n",
    "    RUNNER = 'DirectRunner'\n",
    "  else:\n",
    "    RUNNER = 'DataflowRunner'\n",
    "\n",
    "  # set up raw data metadata\n",
    "  raw_data_schema = {\n",
    "    colname : dataset_schema.ColumnSchema(tf.string, [], dataset_schema.FixedColumnRepresentation())\n",
    "                   for colname in 'dayofweek,key'.split(',')\n",
    "  }\n",
    "  raw_data_schema.update({\n",
    "      colname : dataset_schema.ColumnSchema(tf.float32, [], dataset_schema.FixedColumnRepresentation())\n",
    "                   for colname in 'fare_amount,pickuplon,pickuplat,dropofflon,dropofflat'.split(',')\n",
    "    })\n",
    "  raw_data_schema.update({\n",
    "      colname : dataset_schema.ColumnSchema(tf.int64, [], dataset_schema.FixedColumnRepresentation())\n",
    "                   for colname in 'hourofday,passengers'.split(',')\n",
    "    })\n",
    "  raw_data_metadata = dataset_metadata.DatasetMetadata(dataset_schema.Schema(raw_data_schema))\n",
    "\n",
    "  # run Beam  \n",
    "  with beam.Pipeline(RUNNER, options=opts) as p:\n",
    "    with beam_impl.Context(temp_dir=os.path.join(OUTPUT_DIR, 'tmp')):\n",
    "      # save the raw data metadata\n",
    "      raw_data_metadata | 'WriteInputMetadata' >> tft_beam_io.WriteMetadata(\n",
    "            os.path.join(OUTPUT_DIR, 'metadata/rawdata_metadata'),\n",
    "            pipeline=p)\n",
    "      \n",
    "      # read training data from bigquery and filter rows     \n",
    "      raw_data = (p \n",
    "        | 'train_read' >> beam.io.Read(beam.io.BigQuerySource(query=create_query(1, EVERY_N), use_standard_sql=True))\n",
    "        | 'train_filter' >> beam.Filter(is_valid))\n",
    "      raw_dataset = (raw_data, raw_data_metadata)\n",
    "      \n",
    "      # analyze and transform training data\n",
    "      transformed_dataset, transform_fn = (\n",
    "          raw_dataset | beam_impl.AnalyzeAndTransformDataset(preprocess_tft))\n",
    "      transformed_data, transformed_metadata = transformed_dataset\n",
    "      \n",
    "      # save transformed training data to disk in efficient tfrecord format\n",
    "      transformed_data | 'WriteTrainData' >> tfrecordio.WriteToTFRecord(\n",
    "          os.path.join(OUTPUT_DIR, 'train'),\n",
    "          file_name_suffix='.gz',\n",
    "          coder=example_proto_coder.ExampleProtoCoder(\n",
    "              transformed_metadata.schema))\n",
    "      \n",
    "      # read eval data from bigquery and filter rows  \n",
    "      raw_test_data = (p \n",
    "        | 'eval_read' >> beam.io.Read(beam.io.BigQuerySource(query=create_query(2, EVERY_N), use_standard_sql=True))\n",
    "        | 'eval_filter' >> beam.Filter(is_valid))\n",
    "      raw_test_dataset = (raw_test_data, raw_data_metadata)\n",
    "      \n",
    "      # transform eval data\n",
    "      transformed_test_dataset = (\n",
    "          (raw_test_dataset, transform_fn) | beam_impl.TransformDataset())\n",
    "      transformed_test_data, _ = transformed_test_dataset\n",
    "      \n",
    "      # save transformed training data to disk in efficient tfrecord format\n",
    "      transformed_test_data | 'WriteTestData' >> tfrecordio.WriteToTFRecord(\n",
    "          os.path.join(OUTPUT_DIR, 'eval'),\n",
    "          file_name_suffix='.gz',\n",
    "          coder=example_proto_coder.ExampleProtoCoder(\n",
    "              transformed_metadata.schema))\n",
    "      \n",
    "      # save transformation function to disk for use at serving time\n",
    "      transform_fn | 'WriteTransformFn' >> transform_fn_io.WriteTransformFn(\n",
    "          os.path.join(OUTPUT_DIR, 'metadata'))\n",
    "\n",
    "preprocess(in_test_mode=False) # change to True to run locally"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bash\n",
    "# ls preproc_tft\n",
    "gcloud storage ls gs://${BUCKET}/taxifare/preproc_tft/"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<h2> Train off preprocessed data </h2>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bash\n",
    "rm -rf taxifare_tft.tar.gz taxi_trained\n",
    "export PYTHONPATH=${PYTHONPATH}:$PWD/taxifare_tft\n",
    "python -m trainer.task \\\n",
    "   --train_data_paths=\"gs://${BUCKET}/taxifare/preproc_tft/train*\" \\\n",
    "   --eval_data_paths=\"gs://${BUCKET}/taxifare/preproc_tft/eval*\"  \\\n",
    "   --output_dir=./taxi_trained \\\n",
    "   --train_steps=10 --job-dir=/tmp \\\n",
    "   --metadata_path=gs://${BUCKET}/taxifare/preproc_tft/metadata"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!ls $PWD/taxi_trained/export/exporter"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%writefile /tmp/test.json\n",
    "{\"dayofweek\":\"Thu\",\"hourofday\":17,\"pickuplon\": -73.885262,\"pickuplat\": 40.773008,\"dropofflon\": -73.987232,\"dropofflat\": 40.732403,\"passengers\": 2}"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bash\n",
    "model_dir=$(ls $PWD/taxi_trained/export/exporter/)\n",
    "gcloud ai-platform local predict \\\n",
    "    --model-dir=./taxi_trained/export/exporter/${model_dir} \\\n",
    "    --json-instances=/tmp/test.json"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Copyright 2016-2018 Google Inc. Licensed under the Apache License, Version 2.0 (the \"License\"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 2",
   "language": "python",
   "name": "python2"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.13"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
